/**
 * Created on April 2, 2009.
 *
 * Copyright 2010- The MITRE Corporation. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you  may not 
 * use this file except in compliance with the License. You may obtain a copy of 
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
 * License for the specific language governing permissions andlimitations under
 * the License.
 *
 * $Id: SortKmerRevisedRelativeEntropies.java 3 2010-12-08 20:56:31Z mcolosimo@mitre.org $
 */
package org.mitre.ccv.mapred;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.TreeSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.mitre.ccv.mapred.io.KmerEntropyPairWritable;
import org.mitre.mapred.fs.FileUtils;

/**
 * A map-reduce class that sorts the revised relative entropies as generated by
 * {@link CalculateKmerRevisedRelativeEntropy} from highest to lowest as  KmerEntropyPairWritable:Text(k-mer) Key:Value pairs.
 *
 * <p>This supports two different output formats
 * <ul>
 * <li>Binary SequenceFile (default)</li>
 * <li>plain text (-t option or setting the binary JobConf property {@link #TEXT_OUTPUT})
 * </ul>
 * @author Marc Colosimo
 */
public class SortKmerRevisedRelativeEntropies extends Configured implements Tool {

    private static final Log LOG = LogFactory.getLog(SortKmerRevisedRelativeEntropies.class);

    public static final String TEXT_OUTPUT = "ccv.sortrre.textoutput";

    public static class ReverseSortMap extends MapReduceBase
            implements Mapper<Text, KmerEntropyPairWritable, KmerEntropyPairWritable, Text> {

        @Override
        public void map(Text kmer, KmerEntropyPairWritable value,
                OutputCollector<KmerEntropyPairWritable, Text> output,
                Reporter reporter) throws IOException {
            output.collect(value, kmer);
        }
    }

    /**
     * Start a job with the given configuration and parameters.
     *
     * <P>Setting the <code>jobConf</code> boolean property "ccv.sortrre.textoutput" results in
     * the output being text instead of the default binary {@link SequenceFile}.
     *
     * @param jobConf
     * @param input
     * @param output
     * @param cleanLogs
     * @return
     * @throws java.lang.Exception
     */
    public int initJob(JobConf jobConf, String input, String output, boolean cleanLogs) throws Exception {
        JobConf conf = new JobConf(jobConf, SortKmerRevisedRelativeEntropies.class);
        conf.setJobName("SortKmerRevisedRelativeEntropies");

        // setup mapper
        SequenceFileInputFormat.setInputPaths(conf, input);
        conf.setInputFormat(SequenceFileInputFormat.class);
        conf.setMapperClass(ReverseSortMap.class);
        conf.setOutputKeyClass(KmerEntropyPairWritable.class);         // job output key class
        conf.setOutputValueClass(Text.class);  // job output value class

        // Uses default reducer (IdentityReducer)
        if (conf.getBoolean(TEXT_OUTPUT, false)) {
            FileOutputFormat.setOutputPath(conf, new Path(output));
        } else {
            conf.setOutputFormat(SequenceFileOutputFormat.class);
            SequenceFileOutputFormat.setOutputPath(conf, new Path(output));
        }
        JobClient.runJob(conf);

        return 0;
    }

    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf());
        boolean cleanLogs = false;

        // @TODO: use commons getopts, org.apache.hadoop.util.GenericOptionsParser used it
        ArrayList<String> other_args = new ArrayList<String>();
        for (int i = 0; i < args.length; ++i) {
            try {
                if ("-m".equals(args[i])) {
                    conf.setNumMapTasks(Integer.parseInt(args[++i]));
                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else if ("-c".equals(args[i])) {
                    cleanLogs = true;
                } else if ("-t".equals(args[i])) {
                    conf.setBoolean(TEXT_OUTPUT, true);
                } else if ("-libjars".equals(args[i])) {
                    conf.set("tmpjars",
                            FileUtils.validateFiles(args[++i], conf));

                    URL[] libjars = FileUtils.getLibJars(conf);
                    if (libjars != null && libjars.length > 0) {
                        // Add libjars to client/tasks classpath
                        conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
                        // Adds libjars to our classpath
                        Thread.currentThread().setContextClassLoader(
                                new URLClassLoader(libjars,
                                Thread.currentThread().getContextClassLoader()));
                    }
                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from " +
                        args[i - 1]);
                return printUsage();
            }
        }
        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: " +
                    other_args.size() + " instead of 3.");
            return printUsage();
        }

        return initJob(conf, other_args.get(0), other_args.get(1), cleanLogs);
    }

    /**
     * Returns the given number of k-mers from {@link SequenceFile}s generated by this class.
     *
     * @param conf
     * @param input the path containing the <code>SequenceFile</code> parts.
     * @param m     the number of k-mers to return. If <= 0, then {@link Integer.MAX_VALUE} is returned.
     * @return
     * @throws java.io.IOException
     */
    static TreeSet<String> getkmers(JobConf conf, String input, Integer m) throws IOException {
        TreeSet<String> nmers = new TreeSet<String>();
        Path inputPath = new Path(input);
        FileSystem fs = inputPath.getFileSystem(conf);
        //Path inputPath = fs.makeQualified(path);
        Path[] paths = FileUtils.ls(conf, inputPath.toString() + Path.SEPARATOR + "part-*");
        if (m <= 0) {
            m = Integer.MAX_VALUE;
        }
        int cnt = 0;
        for (int idx = 0; idx < paths.length; idx++) {
            SequenceFile.Reader reader = new SequenceFile.Reader(fs, paths[idx], conf);
            KmerEntropyPairWritable key = new KmerEntropyPairWritable();
            boolean hasNext = true;
            while (hasNext && cnt < m) {
                hasNext = reader.next(key);
                nmers.add(key.getKey());
                cnt++;
            }
        }
        return nmers;
    }

    static int printUsage() {
        System.out.println("SortKmerRevisedRelativeEntropies [-libjars <classpath,...>] [-m <maps>] [-r <reduces>] [-c]" +
                " <input> <output>");
        return -1;
    }

    static public void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                new SortKmerRevisedRelativeEntropies(), args);
        System.exit(res);
    }
}
